yavsc/src/Yavsc/ViewModels/Streaming/LiveCastHandler.cs

93 lines
2.9 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
6 years ago
using Yavsc.Helpers;
using Yavsc.Models;
using Yavsc.Models.FileSystem;
namespace Yavsc.ViewModels.Streaming
{
5 years ago
public class LiveCastClient
{
public string UserName { get; set; }
public WebSocket Socket { get; set; }
}
5 years ago
public class LiveEntryViewModel
{
public string UserName { get; set; }
public string FlowId { get; set; }
}
public class LiveCastHandler : IDisposable
{
public WebSocket Socket { get; set; }
public ConcurrentDictionary<string, WebSocket> Listeners { get; set; } = new ConcurrentDictionary<string, WebSocket>();
5 years ago
public CancellationTokenSource TokenSource { get; set; } = new CancellationTokenSource();
public void Dispose()
{
}
5 years ago
public async Task<FileRecievedInfo> ReceiveUserFile(ApplicationUser user, ILogger logger, string root, Queue<ArraySegment<byte>> queue, string destFileName, Func<bool> isEndOfInput)
{
5 years ago
// TODO lock user's disk usage for this scope,
// this process is not safe at concurrent access.
long usage = user.DiskUsage;
var item = new FileRecievedInfo
{
FileName = AbstractFileSystemHelpers.FilterFileName(destFileName),
DestDir = root
};
var fi = new FileInfo(Path.Combine(root, item.FileName));
if (fi.Exists)
{
item.Overriden = true;
usage -= fi.Length;
5 years ago
}
logger.LogInformation("Opening the file");
using (var dest = fi.Open(FileMode.Create, FileAccess.Write, FileShare.Read))
{
logger.LogInformation("Appening to file");
5 years ago
while (!isEndOfInput() || queue.Count > 0)
{
if (queue.Count > 0)
{
5 years ago
var buffer = queue.Dequeue();
logger.LogInformation($"writing {buffer.Array.Length} bytes...");
5 years ago
await dest.WriteAsync(buffer.Array, buffer.Offset, buffer.Count);
logger.LogInformation($"done.");
usage += buffer.Count;
}
5 years ago
if (usage >= user.DiskQuota) break;
if (queue.Count == 0 && !isEndOfInput())
{
await Task.Delay(100);
}
}
user.DiskUsage = usage;
dest.Close();
}
5 years ago
if (usage >= user.DiskQuota)
{
item.QuotaOffensed = true;
}
user.DiskUsage = usage;
return item;
5 years ago
}
}
}