yavsc/src/cli/Commands/Streamer.cs

113 lines
4.4 KiB
C#

7 years ago
using System;
using System.IO;
7 years ago
using System.Net.WebSockets;
7 years ago
using System.Threading;
using System.Threading.Tasks;
using cli.Model;
using Microsoft.Extensions.CommandLineUtils;
7 years ago
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.OptionsModel;
using Yavsc;
7 years ago
7 years ago
namespace cli {
7 years ago
public class Streamer: ICommander {
7 years ago
private ClientWebSocket _client;
private ILogger _logger;
private ConnectionSettings _cxSettings;
private UserConnectionSettings _userCxSettings;
7 years ago
private CommandOption _fileOption;
private CommandArgument _flowIdArg;
private CancellationTokenSource _tokenSource;
7 years ago
7 years ago
public Streamer(ILoggerFactory loggerFactory,
IOptions<ConnectionSettings> cxSettings,
IOptions<UserConnectionSettings> userCxSettings
)
{
_logger = loggerFactory.CreateLogger<Streamer>();
_cxSettings = cxSettings.Value;
_userCxSettings = userCxSettings.Value;
_client = new ClientWebSocket();
_client.Options.SetRequestHeader("Authorization", $"Bearer {_userCxSettings.AccessToken}");
}
7 years ago
public CommandLineApplication Integrate(CommandLineApplication rootApp)
{
CommandLineApplication streamCmd = rootApp.Command("stream",
(target) =>
{
target.FullName = "Stream to server";
target.Description = "Stream arbitrary binary data to your server channel";
_fileOption = target.Option("-f | --file", "use given file as input stream", CommandOptionType.SingleValue);
_flowIdArg = target.Argument("flowId", "Remote Id for this channel", false);
target.HelpOption("-? | -h | --help");
});
streamCmd.OnExecute(async() => await DoExecute());
return streamCmd;
}
private async Task <int> DoExecute()
{
7 years ago
if (_fileOption.HasValue()){
var fi = new FileInfo(_fileOption.Value());
if (!fi.Exists) {
_logger.LogError("Input file doesn´t exist.");
return -2;
}
using (var stream = fi.OpenRead())
{
_logger.LogInformation("DoExecute from given file");
7 years ago
await DoStream(stream);
}
return 0;
}
else
{
using(var stream = Console.OpenStandardInput())
{
_logger.LogInformation("DoExecute from standard input");
7 years ago
await DoStream(stream);
}
return 0;
}
}
async Task DoStream (Stream stream)
{
7 years ago
_tokenSource = new CancellationTokenSource();
var url = _cxSettings.StreamingUrl+"/"+_flowIdArg.Value;
_logger.LogInformation("Connecting to "+url);
await _client.ConnectAsync(new Uri(url), _tokenSource.Token);
_logger.LogInformation("Connected");
const int bufLen = Constants.WebSocketsMaxBufLen;
7 years ago
byte [] buffer = new byte[bufLen];
const int offset=0;
int read = 0;
/*
var reciving = Task.Run(async ()=> {
byte [] readbuffer = new byte[bufLen];
var rb = new ArraySegment<byte>(readbuffer, 0, bufLen);
bool continueReading = false;
do {
var result = await _client.ReceiveAsync(rb, _tokenSource.Token);
_logger.LogInformation($"received {result.Count} bytes");
continueReading = !result.CloseStatus.HasValue;
} while (continueReading);
} ); */
7 years ago
do {
read = await stream.ReadAsync(buffer, offset, bufLen);
var segment = new ArraySegment<byte>(buffer, offset, read);
bool end = read < bufLen;
await _client.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, end, _tokenSource.Token);
_logger.LogInformation($"sent {read} bytes end:{end} ");
} while (read>0 && stream.CanRead );
// reciving.Wait();
await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "EOF", _tokenSource.Token);
7 years ago
}
7 years ago
}
7 years ago
}